{
 "cells": [
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "# Pyspark for Linear Regression"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### 1. Set up spark context and SparkSession"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 1,
   "metadata": {
    "collapsed": false
   },
   "outputs": [],
   "source": [
    "from pyspark.sql import SparkSession\n",
    "\n",
    "spark = SparkSession \\\n",
    "    .builder \\\n",
    "    .appName(\"Python Spark SQL basic example\") \\\n",
    "    .config(\"spark.some.config.option\", \"some-value\") \\\n",
    "    .getOrCreate()"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### 2. Load dataset"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 2,
   "metadata": {
    "collapsed": false
   },
   "outputs": [],
   "source": [
    "df = spark.read.format('com.databricks.spark.csv').\\\n",
    "                               options(header='true', \\\n",
    "                               inferschema='true').load(\"./data/Advertising.csv\",header=True);"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 3,
   "metadata": {
    "collapsed": false
   },
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "root\n",
      " |-- _c0: integer (nullable = true)\n",
      " |-- TV: double (nullable = true)\n",
      " |-- Radio: double (nullable = true)\n",
      " |-- Newspaper: double (nullable = true)\n",
      " |-- Sales: double (nullable = true)\n",
      "\n"
     ]
    }
   ],
   "source": [
    "df.take(2)\n",
    "df.printSchema()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 4,
   "metadata": {
    "collapsed": false
   },
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "+---+-----+-----+---------+-----+\n",
      "|_c0|   TV|Radio|Newspaper|Sales|\n",
      "+---+-----+-----+---------+-----+\n",
      "|  1|230.1| 37.8|     69.2| 22.1|\n",
      "|  2| 44.5| 39.3|     45.1| 10.4|\n",
      "|  3| 17.2| 45.9|     69.3|  9.3|\n",
      "|  4|151.5| 41.3|     58.5| 18.5|\n",
      "|  5|180.8| 10.8|     58.4| 12.9|\n",
      "|  6|  8.7| 48.9|     75.0|  7.2|\n",
      "+---+-----+-----+---------+-----+\n",
      "only showing top 6 rows\n",
      "\n"
     ]
    }
   ],
   "source": [
    "df.show(6)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### 3. Convert the data to features (dense vector) and label"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 5,
   "metadata": {
    "collapsed": false
   },
   "outputs": [],
   "source": [
    "from pyspark.sql import Row\n",
    "from pyspark.ml.linalg import Vectors"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 6,
   "metadata": {
    "collapsed": true
   },
   "outputs": [],
   "source": [
    "# convert the data to dense vector\n",
    "#def transData(row):\n",
    "#    return Row(label=row[\"Sales\"],\n",
    "#               features=Vectors.dense([row[\"TV\"],\n",
    "#                                       row[\"Radio\"],\n",
    "#                                       row[\"Newspaper\"]]))\n",
    "def transData(data):\n",
    "    return data.rdd.map(lambda r: [Vectors.dense(r[:-1]),r[-1]]).toDF(['features','label'])"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "collapsed": true
   },
   "outputs": [],
   "source": [
    "# convert the data to dense vector\n",
    "#def transData(row):\n",
    "#    return Row(label=row[\"Sales\"],\n",
    "#               features=Vectors.dense([row[\"TV\"],\n",
    "#                                       row[\"Radio\"],\n",
    "#                                       row[\"Newspaper\"]]))\n",
    "def transData(data):\n",
    "    return data.rdd.map(lambda r: [Vectors.dense(r[:-1]),r[-1]]).toDF(['features','label'])"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### 4. Transform the dataset to DataFrame"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 12,
   "metadata": {
    "collapsed": false
   },
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "+--------------------+-----+\n",
      "|            features|label|\n",
      "+--------------------+-----+\n",
      "|[1.0,230.1,37.8,6...| 22.1|\n",
      "|[2.0,44.5,39.3,45.1]| 10.4|\n",
      "|[3.0,17.2,45.9,69.3]|  9.3|\n",
      "|[4.0,151.5,41.3,5...| 18.5|\n",
      "|[5.0,180.8,10.8,5...| 12.9|\n",
      "| [6.0,8.7,48.9,75.0]|  7.2|\n",
      "+--------------------+-----+\n",
      "only showing top 6 rows\n",
      "\n"
     ]
    }
   ],
   "source": [
    "#transformed = df.rdd.map(transData).toDF() \n",
    "data= transData(df)\n",
    "data.show(6)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### 5. Convert features data format and set up training and test data sets"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 16,
   "metadata": {
    "collapsed": true
   },
   "outputs": [],
   "source": [
    "from pyspark.ml import Pipeline\n",
    "from pyspark.ml.regression import LinearRegression\n",
    "from pyspark.ml.feature import VectorIndexer\n",
    "from pyspark.ml.evaluation import RegressionEvaluator"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 17,
   "metadata": {
    "collapsed": false
   },
   "outputs": [],
   "source": [
    "featureIndexer = VectorIndexer(inputCol=\"features\", \\\n",
    "                               outputCol=\"indexedFeatures\",\\\n",
    "                               maxCategories=4).fit(data)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 18,
   "metadata": {
    "collapsed": true
   },
   "outputs": [],
   "source": [
    "# Split the data into training and test sets (40% held out for testing)\n",
    "(trainingData, testData) = data.randomSplit([0.6, 0.4])"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {
    "collapsed": false
   },
   "source": [
    "### 5. Fit model (Ridge Regression and the LASSO)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 19,
   "metadata": {
    "collapsed": true
   },
   "outputs": [],
   "source": [
    "lr = LinearRegression(maxIter=10, regParam=0.3, elasticNetParam=0.8)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 21,
   "metadata": {
    "collapsed": false
   },
   "outputs": [],
   "source": [
    "# Chain indexer and tree in a Pipeline\n",
    "pipeline = Pipeline(stages=[featureIndexer, lr])"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 22,
   "metadata": {
    "collapsed": true
   },
   "outputs": [],
   "source": [
    "# Train model.  This also runs the indexer.\n",
    "model = pipeline.fit(trainingData)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 30,
   "metadata": {
    "collapsed": true
   },
   "outputs": [],
   "source": [
    " lrmodel= model.stages[1]"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 31,
   "metadata": {
    "collapsed": false
   },
   "outputs": [
    {
     "data": {
      "text/plain": [
       "DenseVector([0.0, 0.045, 0.1714, 0.0])"
      ]
     },
     "execution_count": 31,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "lrmodel.coefficients"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 29,
   "metadata": {
    "collapsed": false
   },
   "outputs": [
    {
     "data": {
      "text/plain": [
       "[DenseVector([0.0, 0.045, 0.1714, 0.0])]"
      ]
     },
     "execution_count": 29,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "# or \n",
    "[stage.coefficients for stage in model.stages if hasattr(stage, \"coefficients\")]"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 38,
   "metadata": {
    "collapsed": false
   },
   "outputs": [
    {
     "data": {
      "text/plain": [
       "1.370089789983176"
      ]
     },
     "execution_count": 38,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "lrmodel.summary.meanAbsoluteError"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### 6. Make predictions"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 25,
   "metadata": {
    "collapsed": true
   },
   "outputs": [],
   "source": [
    "predictions = model.transform(testData)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 26,
   "metadata": {
    "collapsed": false
   },
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "+------------------+-----+--------------------+\n",
      "|        prediction|label|            features|\n",
      "+------------------+-----+--------------------+\n",
      "|12.192355219183316|  7.2| [6.0,8.7,48.9,75.0]|\n",
      "| 11.62882026449108| 11.8|[7.0,57.5,32.8,23.5]|\n",
      "|12.187974870101684| 13.2|[8.0,120.2,19.6,1...|\n",
      "|4.1655336452224265|  4.8|   [9.0,8.6,2.1,1.0]|\n",
      "|12.856341613250077| 10.6|[10.0,199.8,2.6,2...|\n",
      "+------------------+-----+--------------------+\n",
      "only showing top 5 rows\n",
      "\n"
     ]
    }
   ],
   "source": [
    "# Select example rows to display.\n",
    "predictions.select(\"prediction\", \"label\", \"features\").show(5)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### 8. Evaluation"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 27,
   "metadata": {
    "collapsed": false
   },
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "Root Mean Squared Error (RMSE) on test data = 1.4827\n"
     ]
    }
   ],
   "source": [
    "# Select (prediction, true label) and compute test error\n",
    "evaluator = RegressionEvaluator(\n",
    "    labelCol=\"label\", predictionCol=\"prediction\", metricName=\"rmse\")\n",
    "rmse = evaluator.evaluate(predictions)\n",
    "print(\"Root Mean Squared Error (RMSE) on test data = %g\" % rmse)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "- summary"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "collapsed": false
   },
   "outputs": [],
   "source": [
    "temp_path = 'temp/Users/wenqiangfeng/Dropbox/Spark/Code/model'\n",
    "modelPath = temp_path + \"/lr_model\"\n",
    "model.save(modelPath)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "- save and extract model"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "collapsed": false
   },
   "outputs": [],
   "source": [
    "lr2 = model.load(modelPath)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "collapsed": false
   },
   "outputs": [],
   "source": [
    "lr2.coefficients"
   ]
  }
 ],
 "metadata": {
  "anaconda-cloud": {},
  "kernelspec": {
   "display_name": "Python 2",
   "language": "python",
   "name": "python2"
  },
  "language_info": {
   "codemirror_mode": {
    "name": "ipython",
    "version": 2
   },
   "file_extension": ".py",
   "mimetype": "text/x-python",
   "name": "python",
   "nbconvert_exporter": "python",
   "pygments_lexer": "ipython2",
   "version": "2.7.6"
  }
 },
 "nbformat": 4,
 "nbformat_minor": 1
}
